package priv.lhy.ecm.collectorProducer.aftertreatment.reupload.impl;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import priv.lhy.common.utils.DateUtil;
import priv.lhy.ecm.collectorProducer.aftertreatment.reupload.IReupload;
import priv.lhy.ecm.collectorProducer.constants.ExceptionType;

import java.io.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * @author: lihy
 * date: 2019/7/3 10:03
 * description:
 */
@Component("reuploadByFile")
public class ReuploadByFile implements IReupload{

    @Value("${upload.keepRecord.path.prefix}")
    private String filePathPrefix;

    @Value("${netty.tcp.host}")
    private String tcpHost;

    @Value("${netty.tcp.port}")
    private int tcpPort;

    @Autowired
    Bootstrap bootstrap;

    @Override
    public void sendMsg(){
        File reuploadPath = new File(filePathPrefix);
        if (reuploadPath.exists()) {
            getFile(reuploadPath);
        }
    }

    /**
     * 递归获取所有文件
     * @param parent
     */
    private void getFile(File parent) {
        //获取非手动处理的文件
        if (parent.isDirectory() && !parent.getName().equalsIgnoreCase(ExceptionType.MANUAL_ERROR
                .name())) {
            for (File f : parent.listFiles()) {
                if (f.isFile()) {
                    //文件名
                    String fileName = f.getName();
                    //文件夹名
                    String parentName = f.getParentFile().getName();
                    //文件创建时间
                    String recordTime = fileName.split("_")[2];
                    //当前正在使用的文件不处理
                    if (!recordTime.substring(0, 8).equals(DateUtil.dateCustomStr(new Date(),
                            "yyyyMMdd"))) {
                        List<String> records = new ArrayList<>();
                        Reader reader = null;
                        BufferedReader br = null;
                        try {
                            reader = new FileReader(f);
                            br = new BufferedReader(reader);
                            String line;

                            while ((line = br.readLine()) != null) {
                                records.add(line);
                            }
                            boolean flag = sendMsg(parentName, fileName, records);
                            //删除文件之前必须先关闭io流
                            reader.close();
                            br.close();
                            //发送成功删除文件
                            if (flag) {
                                f.delete();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            return;
                        } finally {
                            if (null != br) {
                                try {
                                    br.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                            if (null != reader) {
                                try {
                                    reader.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }
                } else {
                    getFile(f);
                }
            }
        }
    }

    /**
     * 发送数据
     * @param parentName
     * @param fileName
     * @param records
     * @return
     */
    private boolean sendMsg(String parentName, String fileName, List<String> records) {
        try {

            for (String record : records) {
                ChannelFuture future = bootstrap.connect(tcpHost, tcpPort).sync();
                Channel channel = future.channel();
                //正常上传不记录上传时间
                String recordTime = fileName.split("_")[2];
                record.replace("||", "|" + recordTime.substring(0, 8) + "|");

                //将上传信息作为附件传递，服务端返回错误可进行后续处理
                AttributeKey<String> msgKey = AttributeKey.valueOf("uploadMsg");
                Attribute<String> msgAttr = channel.attr(msgKey);
                //传递文件路径，用来分类手动处理文件
                AttributeKey<String> argKey = AttributeKey.valueOf("arg");
                Attribute<String> argAttr = channel.attr(argKey);
                msgAttr.set(record);
                argAttr.set(parentName + "/" + fileName);

                channel.writeAndFlush(record);
                //睡眠10毫秒，防止对服务器造成冲击
                Thread.sleep(10);
                future.channel().closeFuture().sync();
            }
            return true;
        } catch (Exception ex) {
            return false;
        }
    }
}
